1 package org.apache.lucene.replicator;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import java.io.Closeable;
21 import java.io.IOException;
22 import java.util.HashMap;
23 import java.util.concurrent.Callable;
24 import java.util.concurrent.atomic.AtomicInteger;
25
26 import org.apache.lucene.document.Document;
27 import org.apache.lucene.index.DirectoryReader;
28 import org.apache.lucene.index.IndexWriter;
29 import org.apache.lucene.index.IndexWriterConfig;
30 import org.apache.lucene.index.SnapshotDeletionPolicy;
31 import org.apache.lucene.replicator.ReplicationClient.ReplicationHandler;
32 import org.apache.lucene.replicator.ReplicationClient.SourceDirectoryFactory;
33 import org.apache.lucene.store.Directory;
34 import org.apache.lucene.store.MockDirectoryWrapper;
35 import org.apache.lucene.util.IOUtils;
36 import org.apache.lucene.util.TestUtil;
37 import org.apache.lucene.util.ThreadInterruptedException;
38 import org.junit.After;
39 import org.junit.Before;
40 import org.junit.Test;
41
42 public class IndexReplicationClientTest extends ReplicatorTestCase {
43
44 private static class IndexReadyCallback implements Callable<Boolean>, Closeable {
45
46 private final Directory indexDir;
47 private DirectoryReader reader;
48 private long lastGeneration = -1;
49
50 public IndexReadyCallback(Directory indexDir) throws IOException {
51 this.indexDir = indexDir;
52 if (DirectoryReader.indexExists(indexDir)) {
53 reader = DirectoryReader.open(indexDir);
54 lastGeneration = reader.getIndexCommit().getGeneration();
55 }
56 }
57
58 @Override
59 public Boolean call() throws Exception {
60 if (reader == null) {
61 reader = DirectoryReader.open(indexDir);
62 lastGeneration = reader.getIndexCommit().getGeneration();
63 } else {
64 DirectoryReader newReader = DirectoryReader.openIfChanged(reader);
65 assertNotNull("should not have reached here if no changes were made to the index", newReader);
66 long newGeneration = newReader.getIndexCommit().getGeneration();
67 assertTrue("expected newer generation; current=" + lastGeneration + " new=" + newGeneration, newGeneration > lastGeneration);
68 reader.close();
69 reader = newReader;
70 lastGeneration = newGeneration;
71 TestUtil.checkIndex(indexDir);
72 }
73 return null;
74 }
75
76 @Override
77 public void close() throws IOException {
78 IOUtils.close(reader);
79 }
80 }
81
82 private MockDirectoryWrapper publishDir, handlerDir;
83 private Replicator replicator;
84 private SourceDirectoryFactory sourceDirFactory;
85 private ReplicationClient client;
86 private ReplicationHandler handler;
87 private IndexWriter publishWriter;
88 private IndexReadyCallback callback;
89
90 private static final String VERSION_ID = "version";
91
92 private void assertHandlerRevision(int expectedID, Directory dir) throws IOException {
93
94
95
96 while (client.isUpdateThreadAlive()) {
97
98 try {
99 Thread.sleep(100);
100 } catch (InterruptedException e) {
101 throw new ThreadInterruptedException(e);
102 }
103
104 try {
105 DirectoryReader reader = DirectoryReader.open(dir);
106 try {
107 int handlerID = Integer.parseInt(reader.getIndexCommit().getUserData().get(VERSION_ID), 16);
108 if (expectedID == handlerID) {
109 return;
110 } else if (VERBOSE) {
111 System.out.println("expectedID=" + expectedID + " actual=" + handlerID + " generation=" + reader.getIndexCommit().getGeneration());
112 }
113 } finally {
114 reader.close();
115 }
116 } catch (Exception e) {
117
118
119
120 }
121 }
122 }
123
124 private Revision createRevision(final int id) throws IOException {
125 publishWriter.addDocument(new Document());
126 publishWriter.setCommitData(new HashMap<String, String>() {{
127 put(VERSION_ID, Integer.toString(id, 16));
128 }});
129 publishWriter.commit();
130 return new IndexRevision(publishWriter);
131 }
132
133 @Override
134 @Before
135 public void setUp() throws Exception {
136 super.setUp();
137 publishDir = newMockDirectory();
138 handlerDir = newMockDirectory();
139 sourceDirFactory = new PerSessionDirectoryFactory(createTempDir("replicationClientTest"));
140 replicator = new LocalReplicator();
141 callback = new IndexReadyCallback(handlerDir);
142 handler = new IndexReplicationHandler(handlerDir, callback);
143 client = new ReplicationClient(replicator, handler, sourceDirFactory);
144
145 IndexWriterConfig conf = newIndexWriterConfig(null);
146 conf.setIndexDeletionPolicy(new SnapshotDeletionPolicy(conf.getIndexDeletionPolicy()));
147 publishWriter = new IndexWriter(publishDir, conf);
148 }
149
150 @After
151 @Override
152 public void tearDown() throws Exception {
153 publishWriter.close();
154 IOUtils.close(client, callback, replicator, publishDir, handlerDir);
155 super.tearDown();
156 }
157
158 @Test
159 public void testNoUpdateThread() throws Exception {
160 assertNull("no version expected at start", handler.currentVersion());
161
162
163 replicator.publish(createRevision(1));
164 client.updateNow();
165
166 replicator.publish(createRevision(2));
167 client.updateNow();
168
169
170 replicator.publish(createRevision(3));
171 replicator.publish(createRevision(4));
172 client.updateNow();
173 }
174
175 @Test
176 public void testUpdateThread() throws Exception {
177 client.startUpdateThread(10, "index");
178
179 replicator.publish(createRevision(1));
180 assertHandlerRevision(1, handlerDir);
181
182 replicator.publish(createRevision(2));
183 assertHandlerRevision(2, handlerDir);
184
185
186 replicator.publish(createRevision(3));
187 replicator.publish(createRevision(4));
188 assertHandlerRevision(4, handlerDir);
189 }
190
191 @Test
192 public void testRestart() throws Exception {
193 replicator.publish(createRevision(1));
194 client.updateNow();
195
196 replicator.publish(createRevision(2));
197 client.updateNow();
198
199 client.stopUpdateThread();
200 client.close();
201 client = new ReplicationClient(replicator, handler, sourceDirFactory);
202
203
204 replicator.publish(createRevision(3));
205 replicator.publish(createRevision(4));
206 client.updateNow();
207 }
208
209
210
211
212
213
214
215 @Test
216 public void testConsistencyOnExceptions() throws Exception {
217
218 replicator.publish(createRevision(1));
219 client.updateNow();
220 client.close();
221 callback.close();
222
223
224
225
226
227
228
229
230
231
232 handlerDir.setPreventDoubleWrite(false);
233
234
235 final SourceDirectoryFactory in = sourceDirFactory;
236 final AtomicInteger failures = new AtomicInteger(atLeast(10));
237 sourceDirFactory = new SourceDirectoryFactory() {
238
239 private long clientMaxSize = 100, handlerMaxSize = 100;
240 private double clientExRate = 1.0, handlerExRate = 1.0;
241
242 @Override
243 public void cleanupSession(String sessionID) throws IOException {
244 in.cleanupSession(sessionID);
245 }
246
247 @SuppressWarnings("synthetic-access")
248 @Override
249 public Directory getDirectory(String sessionID, String source) throws IOException {
250 Directory dir = in.getDirectory(sessionID, source);
251 if (random().nextBoolean() && failures.get() > 0) {
252 MockDirectoryWrapper mdw = new MockDirectoryWrapper(random(), dir);
253 mdw.setRandomIOExceptionRateOnOpen(clientExRate);
254 mdw.setMaxSizeInBytes(clientMaxSize);
255 mdw.setRandomIOExceptionRate(clientExRate);
256 mdw.setCheckIndexOnClose(false);
257 clientMaxSize *= 2;
258 clientExRate /= 2;
259 return mdw;
260 }
261
262 if (failures.get() > 0 && random().nextBoolean()) {
263 handlerDir.setMaxSizeInBytes(handlerMaxSize);
264 handlerDir.setRandomIOExceptionRateOnOpen(handlerExRate);
265 handlerDir.setRandomIOExceptionRate(handlerExRate);
266 handlerMaxSize *= 2;
267 handlerExRate /= 2;
268 } else {
269
270 handlerDir.setMaxSizeInBytes(0);
271 handlerDir.setRandomIOExceptionRate(0.0);
272 handlerDir.setRandomIOExceptionRateOnOpen(0.0);
273 }
274 return dir;
275 }
276 };
277
278 handler = new IndexReplicationHandler(handlerDir, new Callable<Boolean>() {
279 @Override
280 public Boolean call() throws Exception {
281 if (random().nextDouble() < 0.2 && failures.get() > 0) {
282 throw new RuntimeException("random exception from callback");
283 }
284 return null;
285 }
286 });
287
288
289 client = new ReplicationClient(replicator, handler, sourceDirFactory) {
290 @SuppressWarnings("synthetic-access")
291 @Override
292 protected void handleUpdateException(Throwable t) {
293 if (t instanceof IOException) {
294 if (VERBOSE) {
295 System.out.println("hit exception during update: " + t);
296 t.printStackTrace(System.out);
297 }
298 try {
299
300 DirectoryReader reader = DirectoryReader.open(handlerDir.getDelegate());
301 try {
302 int numDocs = reader.numDocs();
303 int version = Integer.parseInt(reader.getIndexCommit().getUserData().get(VERSION_ID), 16);
304 assertEquals(numDocs, version);
305 } finally {
306 reader.close();
307 }
308
309 TestUtil.checkIndex(handlerDir.getDelegate());
310 } catch (IOException e) {
311
312 throw new RuntimeException(e);
313 } finally {
314
315 failures.decrementAndGet();
316 assert failures.get() >= 0 : "handler failed too many times: " + failures.get();
317 if (VERBOSE) {
318 if (failures.get() == 0) {
319 System.out.println("no more failures expected");
320 } else {
321 System.out.println("num failures left: " + failures.get());
322 }
323 }
324 }
325 } else {
326 if (t instanceof RuntimeException) throw (RuntimeException) t;
327 throw new RuntimeException(t);
328 }
329 }
330 };
331
332 client.startUpdateThread(10, "index");
333
334 final Directory baseHandlerDir = handlerDir.getDelegate();
335 int numRevisions = atLeast(20);
336 for (int i = 2; i < numRevisions; i++) {
337 replicator.publish(createRevision(i));
338 assertHandlerRevision(i, baseHandlerDir);
339 }
340
341
342
343 handlerDir.setMaxSizeInBytes(0);
344 handlerDir.setRandomIOExceptionRate(0.0);
345 handlerDir.setRandomIOExceptionRateOnOpen(0.0);
346 }
347
348 }